Skip to main content

3.3 - Code - The Worker Bot Environment

This file provides the complete implementation for our first reinforcement learning task. It contains two key classes: WorkerBot, the BotAI that executes within the game, and WorkerEnv, the gymnasium.Env that provides the interface to the stable-baselines3 agent.

This code is a direct translation of the design specification from the previous section.

Implementation Overview

WorkerBot (The Game Actor):

  • Initializes with communication queues.
  • On a throttled loop (iteration % 8), it blocks, waiting for an action from the agent.
  • Executes the received action.
  • Calculates the reward and the next observation.
  • Puts the (obs, reward, terminated, ...) tuple back on the queue for the agent.
  • Manages the episode termination condition.

WorkerEnv (The Environment Interface):

  • Inherits from our reusable SC2GymEnv.
  • Formally defines the action_space and observation_space to match our design.

worker_bot.py

import numpy as np
from gymnasium.spaces import Box, Discrete
import multiprocessing as mp
from queue import Empty

from burnysc2.bot_ai import BotAI
from burnysc2.ids.unit_typeid import UnitTypeId
from sc2_gym_env import SC2GymEnv, ObservationQueueItem

# --- The BotAI Implementation (Runs in the Game Process) ---

class WorkerBot(BotAI):
"""
The BotAI actor that executes actions and generates observations.
Its only goal is to learn the correct policy for building workers.
"""
def __init__(self, action_queue: mp.Queue, obs_queue: mp.Queue[ObservationQueueItem]):
super().__init__()
self.action_queue = action_queue
self.obs_queue = obs_queue

def _handle_action(self, action: int) -> float:
"""
Executes the agent's action and calculates the sparse reward.
Returns the reward for the action.
"""
can_afford_scv = self.can_afford(UnitTypeId.SCV)
has_idle_cc = self.townhalls.idle.exists

if action == 1: # Action: Build SCV
if can_afford_scv and has_idle_cc:
self.train(UnitTypeId.SCV)
return 5.0 # Positive reward for a correct action
else:
return -5.0 # Negative reward for an impossible/wasted action
return 0.0 # No reward for "Do Nothing" action

async def on_step(self, iteration: int):
"""
The main game loop, throttled to interact with the agent every 8 steps.
"""
if iteration % 8 != 0:
return

try:
# 1. GET ACTION - This is a blocking call, waiting for the RL agent
action = self.action_queue.get(timeout=1)

# 2. EXECUTE ACTION & GET SPARSE REWARD
reward = self._handle_action(action)

# 3. ADD DENSE REWARD for progress
reward += self.workers.amount * 0.1

# 4. GET OBSERVATION for the next state
observation = np.array([
self.minerals / 1000.0,
self.workers.amount / 50.0,
self.supply_left / 20.0
], dtype=np.float32)

# 5. DEFINE TERMINATION CONDITION
terminated = self.workers.amount >= 20

# 6. SEND DATA - Put the results on the queue for the RL agent
self.obs_queue.put((observation, reward, terminated, False, {}))

if terminated:
await self.client.leave()

except Empty:
# This can happen if the training process is killed.
print("Action queue was empty. Assuming training has ended.")
await self.client.leave()
return


# --- The Gymnasium Environment (Runs in the Main Process) ---

class WorkerEnv(SC2GymEnv):
"""
The Gymnasium Wrapper for the WorkerBot.

This class defines the action and observation spaces that are visible
to the stable-baselines3 agent.
"""
def __init__(self):
# Pass our custom BotAI class and a map name to the parent.
super().__init__(bot_class=WorkerBot, map_name="AcropolisLE")

# The agent can choose between two actions: 0 or 1.
self.action_space = Discrete(2)

# The observation is a 1D array of 3 normalized float values.
self.observation_space = Box(
low=0.0,
high=np.inf,
shape=(3,),
dtype=np.float32
)